""" 
    Author: Siren
    Time: 2023/6/20 15:31 
    File: PolFlagModel.py 
    IDE: PyCharm 
"""
# !/usr/bin/env python
# -*- coding:utf-8 -*-
# 1-Python Anaconda 虚拟环境创建命令
# 虚拟环境如何查看有哪些？conda env list
# 虚拟环境如何创建？conda create -n pyspark_env python==3.7.10
# 虚拟环境如何启动？conda activate pyspark_env
# 环境准备
from pyspark import SparkContext
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import udf

from index_development.BaseModel.BaseModelAbstract import BaseModel


class PolFlagModel(BaseModel):
    # TODO 1.获取保险有效状态的标签id
    def getTagId(self):
        return 10
    # TODO 2.计算得到newDF
    def compute(self, esDF: DataFrame, fiveDF: DataFrame, sc: SparkSession, spark: SparkContext):
        print('=============将5级标签规则转化为字典==================')
        fiveRuleDict:dict = fiveDF.rdd.map(lambda x: (x['rule'], x['id'])).collectAsMap()
        print(fiveRuleDict)
        bro = sc.broadcast(fiveRuleDict)
        @udf()
        def polFlagToTags(flag):
            return bro.value[str(flag)]
        print('===========匹配用户数据和字典数据,打标签===============')
        newDF = esDF.select(esDF['user_id'].alias('userId'), polFlagToTags(esDF['pol_flag']).alias('tagsId'))
        newDF.show()
        return newDF

if __name__ == '__main__':
    polFlagModel=PolFlagModel()
    polFlagModel.execute()